{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(model-serving-get-started)=\n",
    "# Model serving graph\n",
    "\n",
    "\n",
    "**In this section**\n",
    "- [Serving Functions](#serving-functions)\n",
    "- [Topology](#topology)\n",
    "- [Remote execution](#remote-execution)\n",
    "- [Examples of graph functionality](#examples)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serving Functions\n",
    "\n",
    "To start using a serving graph, you first need a serving function. A serving function contains the serving\n",
    "class code to run the model and all the code necessary to run the tasks. MLRun comes with a wide library of tasks. If you\n",
    "use just those, you don't have to add any special code to the serving function, you just have to provide\n",
    "the code that runs the model. For more information about serving classes see {ref}`custom-model-serving-class`.\n",
    "\n",
    "For example, the following code is a basic model serving class:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun: start-code"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from cloudpickle import load\n",
    "from typing import List\n",
    "import numpy as np\n",
    "\n",
    "import mlrun\n",
    "\n",
    "\n",
    "class ClassifierModel(mlrun.serving.V2ModelServer):\n",
    "    def load(self):\n",
    "        \"\"\"load and initialize the model and/or other elements\"\"\"\n",
    "        model_file, extra_data = self.get_model(\".pkl\")\n",
    "        self.model = load(open(model_file, \"rb\"))\n",
    "\n",
    "    def predict(self, body: dict) -> List:\n",
    "        \"\"\"Generate model predictions from sample.\"\"\"\n",
    "        feats = np.asarray(body[\"inputs\"])\n",
    "        result: np.ndarray = self.model.predict(feats)\n",
    "        return result.tolist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# mlrun: end-code"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To obtain the serving function, use the `code_to_function` and specify `kind` to be `serving`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "fn = mlrun.code_to_function(\"serving_example\", kind=\"serving\", image=\"mlrun/mlrun\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Topology"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Router\n",
    "Once you have a serving function, you need to choose the graph topology. The default is `router` topology. With the `router` topology you can specify different machine learning models. Each model has a logical name. This name is used to route to the correct model when calling the serving function."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-11-02 04:18:36,925 [info] model model1 was loaded\n",
      "> 2021-11-02 04:18:36,926 [info] Initializing endpoint records\n",
      "> 2021-11-02 04:18:36,965 [info] Loaded ['model1']\n",
      "{'id': '6bd11e864805484ea888f58e478d1f91', 'model_name': 'model1', 'outputs': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]}\n"
     ]
    }
   ],
   "source": [
    "from sklearn.datasets import load_iris\n",
    "\n",
    "# set the topology/router\n",
    "graph = fn.set_topology(\"router\")\n",
    "\n",
    "# Add the model\n",
    "fn.add_model(\n",
    "    \"model1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "\n",
    "# Add additional models\n",
    "# fn.add_model(\"model2\", class_name=\"ClassifierModel\", model_path=\"<path2>\")\n",
    "\n",
    "# create and use the graph simulator\n",
    "server = fn.to_mock_server()\n",
    "x = load_iris()[\"data\"].tolist()\n",
    "result = server.test(\"/v2/models/model1/infer\", {\"inputs\": x})\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Flow\n",
    "\n",
    "You can use the `flow` topology to specify tasks, which typically manipulates the data. The most common scenario is pre-processing of data prior to the model execution.\n",
    "\n",
    "```{note} Once the topology is set, you cannot change an existing function topology.\n",
    "```\n",
    "\n",
    "In this topology, you build and connect the graph (DAG) by adding steps using the `step.to()` method, or by using the \n",
    "`graph.add_step()` method.\n",
    "\n",
    "> The `step.to()` is typically used to chain steps together. `graph.add_step` can add steps anywhere on the\n",
    "> graph and has `before` and `after` parameters to specify the location of the step."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<!-- show example without router -->"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"432pt\" height=\"84pt\"\n",
       " viewBox=\"0.00 0.00 432.42 84.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 80)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-80 428.425,-80 428.425,4 -4,4\"/>\n",
       "<g id=\"clust1\" class=\"cluster\">\n",
       "<title>cluster_router</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"215.1919,-8 215.1919,-68 416.425,-68 416.425,-8 215.1919,-8\"/>\n",
       "</g>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-20.0493 40.698,-20.1479 42.8263,-20.2953 44.9236,-20.4913 46.9815,-20.7353 48.9917,-21.0266 50.9463,-21.3645 52.8377,-21.7479 54.6587,-22.1759 56.4025,-22.6472 58.0628,-23.1606 59.634,-23.7147 61.1107,-24.308 62.4882,-24.9388 63.7625,-25.6054 64.9302,-26.3059 65.9882,-27.0385 66.9343,-27.8012 67.7669,-28.5918 68.4849,-29.4082 69.0878,-30.2481 69.5758,-31.1093 69.9496,-31.9894 70.2102,-32.886 70.3595,-33.7965 70.3997,-34.7186 70.3334,-35.6497 70.1636,-36.5873 69.8937,-37.5287 69.5276,-38.4713 69.0691,-39.4127 68.5225,-40.3503 67.8923,-41.2814 67.1831,-42.2035 66.3996,-43.114 65.5464,-44.0106 64.6285,-44.8907 63.6504,-45.7519 62.617,-46.5918 61.5329,-47.4082 60.4024,-48.1988 59.2299,-48.9615 58.0197,-49.6941 56.7755,-50.3946 55.5012,-51.0612 54.2002,-51.692 52.8757,-52.2853 51.5309,-52.8394 50.1684,-53.3528 48.7908,-53.8241 47.4003,-54.2521 45.9989,-54.6355 44.5886,-54.9734 43.1708,-55.2647 41.7472,-55.5087 40.3189,-55.7047 38.8872,-55.8521 37.4531,-55.9507 36.0175,-56 34.5815,-56 33.146,-55.9507 31.7119,-55.8521 30.2801,-55.7047 28.8519,-55.5087 27.4282,-55.2647 26.0105,-54.9734 24.6001,-54.6355 23.1988,-54.2521 21.8083,-53.8241 20.4306,-53.3528 19.0681,-52.8394 17.7233,-52.2853 16.3989,-51.692 15.0979,-51.0612 13.8236,-50.3946 12.5794,-49.6941 11.3691,-48.9615 10.1967,-48.1988 9.0662,-47.4082 7.982,-46.5918 6.9486,-45.7519 5.9706,-44.8907 5.0526,-44.0106 4.1995,-43.114 3.4159,-42.2035 2.7067,-41.2814 2.0765,-40.3503 1.53,-39.4127 1.0715,-38.4713 .7053,-37.5287 .4355,-36.5873 .2657,-35.6497 .1993,-34.7186 .2395,-33.7965 .3888,-32.886 .6495,-31.9894 1.0232,-31.1093 1.5112,-30.2481 2.1141,-29.4082 2.8321,-28.5918 3.6647,-27.8012 4.6109,-27.0385 5.6689,-26.3059 6.8365,-25.6054 8.1108,-24.9388 9.4884,-24.308 10.9651,-23.7147 12.5362,-23.1606 14.1966,-22.6472 15.9404,-22.1759 17.7614,-21.7479 19.6528,-21.3645 21.6074,-21.0266 23.6176,-20.7353 25.6755,-20.4913 27.7728,-20.2953 29.901,-20.1479 32.0515,-20.0493 34.2154,-20 36.3837,-20 38.5476,-20.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-38\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-38C78.2752,-38 87.2796,-38 96.1148,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-41.5001 106.3553,-38 96.3552,-34.5001 96.3553,-41.5001\"/>\n",
       "</g>\n",
       "<!-- router -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>router</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"314.532,-30.5442 314.532,-45.4558 288.9193,-56 252.6976,-56 227.0849,-45.4558 227.0849,-30.5442 252.6976,-20 288.9193,-20 314.532,-30.5442\"/>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"318.5418,-27.8677 318.5418,-48.1323 289.7142,-60 251.9027,-60 223.0751,-48.1323 223.0751,-27.8677 251.9027,-16 289.7142,-16 318.5418,-27.8677\"/>\n",
       "<text text-anchor=\"middle\" x=\"270.8084\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">router</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;router -->\n",
       "<g id=\"edge3\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;router</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.4692,-38C195.498,-38 204.0728,-38 212.5547,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"212.8118,-41.5001 222.8118,-38 212.8118,-34.5001 212.8118,-41.5001\"/>\n",
       "</g>\n",
       "<!-- router/m1 -->\n",
       "<g id=\"node4\" class=\"node\">\n",
       "<title>router/m1</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"381.425\" cy=\"-38\" rx=\"27\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"381.425\" y=\"-34.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">m1</text>\n",
       "</g>\n",
       "<!-- router&#45;&gt;router/m1 -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>router&#45;&gt;router/m1</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M318.6831,-38C327.1277,-38 335.8077,-38 343.9164,-38\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"344.1792,-41.5001 354.1791,-38 344.1791,-34.5001 344.1792,-41.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7fd46e4dda50>"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn2 = mlrun.code_to_function(\n",
    "    \"serving_example_flow\", kind=\"serving\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "\n",
    "graph2 = fn2.set_topology(\"flow\")\n",
    "\n",
    "graph2_enrich = graph2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})')\n",
    "\n",
    "# add an Ensemble router with two child models (routes)\n",
    "router = graph2.add_step(mlrun.serving.ModelRouter(), name=\"router\", after=\"enrich\")\n",
    "router.add_route(\n",
    "    \"m1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "router.respond()\n",
    "\n",
    "# add an error handling step, run only when/if the \"pre-process\" step fails\n",
    "graph.to(name=\"pre-process\", handler=\"raising_step\").error_handler(\n",
    "    name=\"catcher\", handler=\"handle_error\", full_event=True\n",
    ")\n",
    "\n",
    "# Add additional models\n",
    "# router.add_route(\"m2\", class_name=\"ClassifierModel\", model_path=path2)\n",
    "\n",
    "# plot the graph (using Graphviz)\n",
    "graph2.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2021-11-02 04:18:42,142 [info] model m1 was loaded\n",
      "> 2021-11-02 04:18:42,142 [info] Initializing endpoint records\n",
      "> 2021-11-02 04:18:42,183 [info] Loaded ['m1']\n",
      "{'id': 'f713fd7eedeb431eba101b13c53a15b5'}\n"
     ]
    }
   ],
   "source": [
    "fn2_server = fn2.to_mock_server()\n",
    "\n",
    "result = fn2_server.test(\"/v2/models/m1/infer\", {\"inputs\": x})\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Remote execution\n",
    "\n",
    "You can chain functions together with remote execution. This allows you to:\n",
    "- Call existing functions from the graph and reuse them from other graphs.\n",
    "- Scale up and down different components individually.\n",
    "\n",
    "Calling a remote function can either use HTTP or via a queue (streaming)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### HTTP\n",
    "\n",
    "Calling a function using http uses the special `$remote` class. First deploy the remote function:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:40,674 [info] Starting remote function deploy\n",
      "2022-03-17 08:20:40  (info) Deploying function\n",
      "2022-03-17 08:20:40  (info) Building\n",
      "2022-03-17 08:20:40  (info) Staging files and preparing base images\n",
      "2022-03-17 08:20:40  (info) Building processor image\n",
      "2022-03-17 08:20:42  (info) Build complete\n",
      "2022-03-17 08:20:47  (info) Function deploy complete\n",
      "> 2022-03-17 08:20:48,289 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-graph-basic-concepts-serving-example-flow.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['graph-basic-concepts-serving-example-flow-graph-basic-concepts.default-tenant.app.maor-gcp2.iguazio-cd0.com/']}\n"
     ]
    }
   ],
   "source": [
    "remote_func_name = \"serving-example-flow\"\n",
    "project_name = \"graph-basic-concepts\"\n",
    "fn_remote = mlrun.code_to_function(\n",
    "    remote_func_name, project=project_name, kind=\"serving\", image=\"mlrun/mlrun\"\n",
    ")\n",
    "\n",
    "fn_remote.add_model(\n",
    "    \"model1\",\n",
    "    class_name=\"ClassifierModel\",\n",
    "    model_path=\"https://s3.wasabisys.com/iguazio/models/iris/model.pkl\",\n",
    ")\n",
    "\n",
    "remote_addr = fn_remote.deploy()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a new function with a graph and call the remote function above:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"370pt\" height=\"44pt\"\n",
       " viewBox=\"0.00 0.00 370.28 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-40 366.2796,-40 366.2796,4 -4,4\"/>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-.0493 40.698,-.1479 42.8263,-.2953 44.9236,-.4913 46.9815,-.7353 48.9917,-1.0266 50.9463,-1.3645 52.8377,-1.7479 54.6587,-2.1759 56.4025,-2.6472 58.0628,-3.1606 59.634,-3.7147 61.1107,-4.308 62.4882,-4.9388 63.7625,-5.6054 64.9302,-6.3059 65.9882,-7.0385 66.9343,-7.8012 67.7669,-8.5918 68.4849,-9.4082 69.0878,-10.2481 69.5758,-11.1093 69.9496,-11.9894 70.2102,-12.886 70.3595,-13.7965 70.3997,-14.7186 70.3334,-15.6497 70.1636,-16.5873 69.8937,-17.5287 69.5276,-18.4713 69.0691,-19.4127 68.5225,-20.3503 67.8923,-21.2814 67.1831,-22.2035 66.3996,-23.114 65.5464,-24.0106 64.6285,-24.8907 63.6504,-25.7519 62.617,-26.5918 61.5329,-27.4082 60.4024,-28.1988 59.2299,-28.9615 58.0197,-29.6941 56.7755,-30.3946 55.5012,-31.0612 54.2002,-31.692 52.8757,-32.2853 51.5309,-32.8394 50.1684,-33.3528 48.7908,-33.8241 47.4003,-34.2521 45.9989,-34.6355 44.5886,-34.9734 43.1708,-35.2647 41.7472,-35.5087 40.3189,-35.7047 38.8872,-35.8521 37.4531,-35.9507 36.0175,-36 34.5815,-36 33.146,-35.9507 31.7119,-35.8521 30.2801,-35.7047 28.8519,-35.5087 27.4282,-35.2647 26.0105,-34.9734 24.6001,-34.6355 23.1988,-34.2521 21.8083,-33.8241 20.4306,-33.3528 19.0681,-32.8394 17.7233,-32.2853 16.3989,-31.692 15.0979,-31.0612 13.8236,-30.3946 12.5794,-29.6941 11.3691,-28.9615 10.1967,-28.1988 9.0662,-27.4082 7.982,-26.5918 6.9486,-25.7519 5.9706,-24.8907 5.0526,-24.0106 4.1995,-23.114 3.4159,-22.2035 2.7067,-21.2814 2.0765,-20.3503 1.53,-19.4127 1.0715,-18.4713 .7053,-17.5287 .4355,-16.5873 .2657,-15.6497 .1993,-14.7186 .2395,-13.7965 .3888,-12.886 .6495,-11.9894 1.0232,-11.1093 1.5112,-10.2481 2.1141,-9.4082 2.8321,-8.5918 3.6647,-7.8012 4.6109,-7.0385 5.6689,-6.3059 6.8365,-5.6054 8.1108,-4.9388 9.4884,-4.308 10.9651,-3.7147 12.5362,-3.1606 14.1966,-2.6472 15.9404,-2.1759 17.7614,-1.7479 19.6528,-1.3645 21.6074,-1.0266 23.6176,-.7353 25.6755,-.4913 27.7728,-.2953 29.901,-.1479 32.0515,-.0493 34.2154,0 36.3837,0 38.5476,-.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-18\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-18C78.2752,-18 87.2796,-18 96.1148,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-21.5001 106.3553,-18 96.3552,-14.5001 96.3553,-21.5001\"/>\n",
       "</g>\n",
       "<!-- remote_func -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>remote_func</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"292.7357\" cy=\"-18\" rx=\"69.5877\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"292.7357\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">remote_func</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;remote_func -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;remote_func</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.5201,-18C195.4559,-18 204.0312,-18 212.7424,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"213.0009,-21.5001 223.0008,-18 213.0008,-14.5001 213.0009,-21.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7f57dc96a0d0>"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn_preprocess = mlrun.new_function(\"preprocess\", kind=\"serving\")\n",
    "graph_preprocessing = fn_preprocess.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocessing.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \"$remote\", \"remote_func\", url=f\"{remote_addr}v2/models/model1/infer\", method=\"put\"\n",
    ").respond()\n",
    "\n",
    "graph_preprocessing.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:48,374 [warning] run command, file or code were not specified\n",
      "{'id': '3a1dd36c-e7de-45af-a0c4-72e3163ba92a', 'model_name': 'model1', 'outputs': [0, 2]}\n"
     ]
    }
   ],
   "source": [
    "fn3_server = fn_preprocess.to_mock_server()\n",
    "my_data = \"\"\"{\"inputs\":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}\"\"\"\n",
    "result = fn3_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Queue (streaming)\n",
    "\n",
    "You can use queues to send events from one part of the graph to another and to decouple the processing of those parts.\n",
    "Queues are better suited to deal with bursts of events, since all the events are stored in the queue until they are processed."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(serving-v3io-stream-example)=\n",
    "#### V3IO stream example\n",
    "The example below uses a V3IO stream, which is a fast real-time implementation of a stream that allows processing of events at very low latency."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting echo.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile echo.py\n",
    "def echo_handler(x):\n",
    "    print(x)\n",
    "    return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the streams"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "streams_prefix = (\n",
    "    f\"v3io:///users/{os.getenv('V3IO_USERNAME')}/examples/graph-basic-concepts\"\n",
    ")\n",
    "\n",
    "input_stream = streams_prefix + \"/in-stream\"\n",
    "out_stream = streams_prefix + \"/out-stream\"\n",
    "err_stream = streams_prefix + \"/err-stream\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Alternativey, use Kafka to configure the streams:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "kafka_prefix = f\"kafka://{broker}/\"\n",
    "internal_topic = kafka_prefix + \"in-topic\"\n",
    "out_topic = kafka_prefix + \"out-topic\"\n",
    "err_topic = kafka_prefix + \"err-topic\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the graph. In the `to` method the class name is one of `>>` or `$queue` to specify that this is a queue. To configure a consumer group for the step, include the group in the `to` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "image/svg+xml": [
       "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"no\"?>\n",
       "<!DOCTYPE svg PUBLIC \"-//W3C//DTD SVG 1.1//EN\"\n",
       " \"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd\">\n",
       "<!-- Generated by graphviz version 2.40.1 (20161225.0304)\n",
       " -->\n",
       "<!-- Title: mlrun&#45;flow Pages: 1 -->\n",
       "<svg width=\"602pt\" height=\"44pt\"\n",
       " viewBox=\"0.00 0.00 602.19 44.00\" xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\">\n",
       "<g id=\"graph0\" class=\"graph\" transform=\"scale(1 1) rotate(0) translate(4 40)\">\n",
       "<title>mlrun&#45;flow</title>\n",
       "<polygon fill=\"#ffffff\" stroke=\"transparent\" points=\"-4,4 -4,-40 598.1861,-40 598.1861,4 -4,4\"/>\n",
       "<!-- _start -->\n",
       "<g id=\"node1\" class=\"node\">\n",
       "<title>_start</title>\n",
       "<polygon fill=\"#d3d3d3\" stroke=\"#000000\" points=\"38.5476,-.0493 40.698,-.1479 42.8263,-.2953 44.9236,-.4913 46.9815,-.7353 48.9917,-1.0266 50.9463,-1.3645 52.8377,-1.7479 54.6587,-2.1759 56.4025,-2.6472 58.0628,-3.1606 59.634,-3.7147 61.1107,-4.308 62.4882,-4.9388 63.7625,-5.6054 64.9302,-6.3059 65.9882,-7.0385 66.9343,-7.8012 67.7669,-8.5918 68.4849,-9.4082 69.0878,-10.2481 69.5758,-11.1093 69.9496,-11.9894 70.2102,-12.886 70.3595,-13.7965 70.3997,-14.7186 70.3334,-15.6497 70.1636,-16.5873 69.8937,-17.5287 69.5276,-18.4713 69.0691,-19.4127 68.5225,-20.3503 67.8923,-21.2814 67.1831,-22.2035 66.3996,-23.114 65.5464,-24.0106 64.6285,-24.8907 63.6504,-25.7519 62.617,-26.5918 61.5329,-27.4082 60.4024,-28.1988 59.2299,-28.9615 58.0197,-29.6941 56.7755,-30.3946 55.5012,-31.0612 54.2002,-31.692 52.8757,-32.2853 51.5309,-32.8394 50.1684,-33.3528 48.7908,-33.8241 47.4003,-34.2521 45.9989,-34.6355 44.5886,-34.9734 43.1708,-35.2647 41.7472,-35.5087 40.3189,-35.7047 38.8872,-35.8521 37.4531,-35.9507 36.0175,-36 34.5815,-36 33.146,-35.9507 31.7119,-35.8521 30.2801,-35.7047 28.8519,-35.5087 27.4282,-35.2647 26.0105,-34.9734 24.6001,-34.6355 23.1988,-34.2521 21.8083,-33.8241 20.4306,-33.3528 19.0681,-32.8394 17.7233,-32.2853 16.3989,-31.692 15.0979,-31.0612 13.8236,-30.3946 12.5794,-29.6941 11.3691,-28.9615 10.1967,-28.1988 9.0662,-27.4082 7.982,-26.5918 6.9486,-25.7519 5.9706,-24.8907 5.0526,-24.0106 4.1995,-23.114 3.4159,-22.2035 2.7067,-21.2814 2.0765,-20.3503 1.53,-19.4127 1.0715,-18.4713 .7053,-17.5287 .4355,-16.5873 .2657,-15.6497 .1993,-14.7186 .2395,-13.7965 .3888,-12.886 .6495,-11.9894 1.0232,-11.1093 1.5112,-10.2481 2.1141,-9.4082 2.8321,-8.5918 3.6647,-7.8012 4.6109,-7.0385 5.6689,-6.3059 6.8365,-5.6054 8.1108,-4.9388 9.4884,-4.308 10.9651,-3.7147 12.5362,-3.1606 14.1966,-2.6472 15.9404,-2.1759 17.7614,-1.7479 19.6528,-1.3645 21.6074,-1.0266 23.6176,-.7353 25.6755,-.4913 27.7728,-.2953 29.901,-.1479 32.0515,-.0493 34.2154,0 36.3837,0 38.5476,-.0493\"/>\n",
       "<text text-anchor=\"middle\" x=\"35.2995\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">start</text>\n",
       "</g>\n",
       "<!-- enrich -->\n",
       "<g id=\"node2\" class=\"node\">\n",
       "<title>enrich</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"146.8955\" cy=\"-18\" rx=\"40.0939\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"146.8955\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">enrich</text>\n",
       "</g>\n",
       "<!-- _start&#45;&gt;enrich -->\n",
       "<g id=\"edge1\" class=\"edge\">\n",
       "<title>_start&#45;&gt;enrich</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M69.9975,-18C78.2752,-18 87.2796,-18 96.1148,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"96.3553,-21.5001 106.3553,-18 96.3552,-14.5001 96.3553,-21.5001\"/>\n",
       "</g>\n",
       "<!-- input_stream -->\n",
       "<g id=\"node3\" class=\"node\">\n",
       "<title>input_stream</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"323.1919,-30 223.1919,-30 223.1919,-6 323.1919,-6 335.1919,-18 323.1919,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"279.1919\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">input_stream</text>\n",
       "</g>\n",
       "<!-- enrich&#45;&gt;input_stream -->\n",
       "<g id=\"edge2\" class=\"edge\">\n",
       "<title>enrich&#45;&gt;input_stream</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M187.3076,-18C195.3909,-18 204.0816,-18 212.7784,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"212.9545,-21.5001 222.9544,-18 212.9544,-14.5001 212.9545,-21.5001\"/>\n",
       "</g>\n",
       "<!-- echo -->\n",
       "<g id=\"node4\" class=\"node\">\n",
       "<title>echo</title>\n",
       "<ellipse fill=\"none\" stroke=\"#000000\" cx=\"403.689\" cy=\"-18\" rx=\"32.4942\" ry=\"18\"/>\n",
       "<text text-anchor=\"middle\" x=\"403.689\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">echo</text>\n",
       "</g>\n",
       "<!-- input_stream&#45;&gt;echo -->\n",
       "<g id=\"edge3\" class=\"edge\">\n",
       "<title>input_stream&#45;&gt;echo</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M335.249,-18C343.8712,-18 352.6535,-18 360.9252,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"361.0793,-21.5001 371.0793,-18 361.0792,-14.5001 361.0793,-21.5001\"/>\n",
       "</g>\n",
       "<!-- output_stream -->\n",
       "<g id=\"node5\" class=\"node\">\n",
       "<title>output_stream</title>\n",
       "<polygon fill=\"none\" stroke=\"#000000\" points=\"582.1861,-30 472.1861,-30 472.1861,-6 582.1861,-6 594.1861,-18 582.1861,-30\"/>\n",
       "<text text-anchor=\"middle\" x=\"533.1861\" y=\"-14.3\" font-family=\"Times,serif\" font-size=\"14.00\" fill=\"#000000\">output_stream</text>\n",
       "</g>\n",
       "<!-- echo&#45;&gt;output_stream -->\n",
       "<g id=\"edge4\" class=\"edge\">\n",
       "<title>echo&#45;&gt;output_stream</title>\n",
       "<path fill=\"none\" stroke=\"#000000\" d=\"M436.3666,-18C444.2602,-18 453.0047,-18 461.9156,-18\"/>\n",
       "<polygon fill=\"#000000\" stroke=\"#000000\" points=\"462.0105,-21.5001 472.0105,-18 462.0105,-14.5001 462.0105,-21.5001\"/>\n",
       "</g>\n",
       "</g>\n",
       "</svg>\n"
      ],
      "text/plain": [
       "<graphviz.dot.Digraph at 0x7f57c7907990>"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "fn_preprocess2 = mlrun.new_function(\"preprocess\", kind=\"serving\")\n",
    "fn_preprocess2.add_child_function(\"echo_func\", \"./echo.py\", \"mlrun/mlrun\")\n",
    "\n",
    "graph_preprocess2 = fn_preprocess2.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocess2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \">>\", \"input_stream\", path=input_stream, group=\"mygroup\"\n",
    ").to(name=\"echo\", handler=\"echo_handler\", function=\"echo_func\").to(\n",
    "    \">>\", \"output_stream\", path=out_stream, sharding_func=\"partition\"\n",
    ")\n",
    "\n",
    "graph_preprocess2.plot(rankdir=\"LR\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "> 2022-03-17 08:20:55,182 [warning] run command, file or code were not specified\n",
      "{'id': 'a6efe8217b024ec7a7e02cf0b7850b91'}\n",
      "{'inputs': [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], 'tag': 'something'}\n"
     ]
    }
   ],
   "source": [
    "from echo import *\n",
    "\n",
    "fn4_server = fn_preprocess2.to_mock_server(current_function=\"*\")\n",
    "\n",
    "my_data = \"\"\"{\"inputs\": [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], \"partition\": 0}\"\"\"\n",
    "\n",
    "result = fn4_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "(serving-kafka-stream-example)=\n",
    "#### Kafka stream example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Overwriting echo.py\n"
     ]
    }
   ],
   "source": [
    "%%writefile echo.py\n",
    "def echo_handler(x):\n",
    "    print(x)\n",
    "    return x"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Configure the streams"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "input_topic = \"in-topic\"\n",
    "out_topic = \"out-topic\"\n",
    "err_topic = \"err-topic\"\n",
    "\n",
    "# replace this\n",
    "brokers = \"<broker IP>\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the graph. In the `to` method the class name is one of `>>` or `$queue` to specify that this is a queue. To configure a consumer group for the step, include the group in the `to` method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import mlrun\n",
    "\n",
    "fn_preprocess2 = mlrun.new_function(\"preprocess\", kind=\"serving\")\n",
    "fn_preprocess2.add_child_function(\"echo_func\", \"./echo.py\", \"mlrun/mlrun\")\n",
    "\n",
    "graph_preprocess2 = fn_preprocess2.set_topology(\"flow\")\n",
    "\n",
    "graph_preprocess2.to(\"storey.Extend\", name=\"enrich\", _fn='({\"tag\": \"something\"})').to(\n",
    "    \">>\",\n",
    "    \"input_stream\",\n",
    "    path=input_topic,\n",
    "    group=\"mygroup\",\n",
    "    kafka_bootstrap_servers=brokers,\n",
    ").to(name=\"echo\", handler=\"echo_handler\", function=\"echo_func\").to(\n",
    "    \">>\", \"output_stream\", path=out_topic, kafka_bootstrap_servers=brokers\n",
    ")\n",
    "\n",
    "graph_preprocess2.plot(rankdir=\"LR\")\n",
    "\n",
    "from echo import *\n",
    "\n",
    "fn4_server = fn_preprocess2.to_mock_server(current_function=\"*\")\n",
    "\n",
    "fn4_server.set_error_stream(f\"kafka://{brokers}/{err_topic}\")\n",
    "\n",
    "my_data = \"\"\"{\"inputs\":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}\"\"\"\n",
    "\n",
    "result = fn4_server.test(\"/v2/models/my_model/infer\", body=my_data)\n",
    "\n",
    "print(result)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Examples of graph functionality"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### NLP processing pipeline with real-time streaming \n",
    "\n",
    "In some cases it's useful to split your processing to multiple functions and use \n",
    "streaming protocols to connect those functions. \n",
    "\n",
    "See the [full notebook example](./distributed-graph.html), where the data processing is in the first function/container and the NLP processing is in the second function. And the second function contains the GPU."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Currently queues support Iguazio v3io and Kafka streams."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Graph that splits and rejoins\n",
    "\n",
    "You can define a graph that splits into two parallel steps, and the output of both steps join back together. \n",
    "\n",
    "In this basic example, all input goes into both stepA and stepB, and then both stepA and stepB forward the input to stepC. \n",
    "This means that a dataset of 5 rows generates an output of 10 rows (barring any filtering or other processing that \n",
    "would change the number of rows).\n",
    "\n",
    "```{admonition} Note\n",
    "Use this configuration to join the graph branches and **not** to join the events into a single large one.\n",
    "``` \n",
    "\n",
    "Example:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "graph.to(\"stepB\")\n",
    "graph.to(\"stepC\")\n",
    "graph.add_step(name=\"stepD\", after=[\"stepB\", \"stepC\"])\n",
    "\n",
    "\n",
    "graph = fn.set_topology(\"flow\", exist_ok=True)\n",
    "dbl = graph.to(name=\"double\", handler=\"double\")\n",
    "dbl.to(name=\"add3\", class_name=\"Adder\", add=3)\n",
    "dbl.to(name=\"add2\", class_name=\"Adder\", add=2)\n",
    "graph.add_step(\"Gather\").after(\"add2\", \"add3\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Graphs that split and rejoin can also be used for these types of scenarios:\n",
    "- Steps B and C are filter steps that complement each other. For example B passes events where key < X, and C passes events where key >= X. The resulting DF contains the exact event ingested, since each event was handled once on one of the branches.\n",
    "- Steps B and C modify the content of the event in different ways. B adds a column col1 with value X, and C adds a column col2 with value X. The resulting DF contains both col1 and col2. Each key is represented twice: once with col1 == X, col2 == null and once with col1 == null, col2 == X."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
